{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## MicroModelling with STOs\n",
    "\n",
    "Very simple example of how to use STOs in combination with the new tdextensions library to create MicroModels (a model per data partition). The example is contrived and the dataset is a synthetic dataset generated to have identical partition distributions etc. \n",
    "\n",
    "This is now how the real world is but we will be expanding this example over the coming months to show you how to deal with unbalanced partitions and partitions where there is not enough data etc. \n",
    "\n",
    "Note that the methods from the tdextensions library will be included in teradataml at the end of Q1 2021.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      "password ··········\n"
     ]
    }
   ],
   "source": [
    "from teradataml.dataframe.dataframe import DataFrame\n",
    "from teradataml import create_context\n",
    "import getpass\n",
    "\n",
    "engine = create_context(host=\"3.238.151.85\", username=\"AOA_DEMO\", password=getpass.getpass(\"password\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training a Model per Partittion\n",
    "\n",
    "We will use the `map_partition` function from tdextensions to seamlessly \"push down\" the python code for training a model based on the data in a given partition. An independent model is trained for each partition and output as a row in a models table per model version and partition id. We also record the training metadata relevant to that partition which allows us to track the result of hyper-parameter optimization or anything else which may be model specific.\n",
    "\n",
    "Behind the scenes, this actually generates and installs an STO file. The big difference here is from the user experience perspective as the user can work \"natively\" with python to achieve their goals. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting training...\n",
      "Finished training\n"
     ]
    }
   ],
   "source": [
    "from teradataml import create_context\n",
    "from teradataml.dataframe.dataframe import DataFrame\n",
    "from tdextensions.distributed import DistDataFrame, DistMode\n",
    "from sklearn.preprocessing import RobustScaler,OneHotEncoder\n",
    "from sklearn.decomposition import PCA\n",
    "from sklearn.ensemble import RandomForestRegressor\n",
    "from sklearn.compose import ColumnTransformer\n",
    "from sklearn.impute import SimpleImputer\n",
    "from sklearn.pipeline import Pipeline\n",
    "from aoa.sto.util import save_metadata, cleanup_cli\n",
    "\n",
    "import os\n",
    "import numpy as np\n",
    "import json\n",
    "import base64\n",
    "import dill\n",
    "import uuid\n",
    "\n",
    "model_version = str(uuid.uuid4())\n",
    "hyperparams = {\n",
    "    \"max_depth\": 5\n",
    "}\n",
    "data_conf = {\n",
    "    \"table\": \"STO_SYNTHETIC_TRAIN_V\"\n",
    "}\n",
    "\n",
    "def train_partition(partition, model_version, hyperparams):\n",
    "    numeric_features = [\"X\"+str(i) for i in range(1,10)]\n",
    "    for i in numeric_features:\n",
    "        partition[i] = partition[i].astype(\"float\")\n",
    "\n",
    "    numeric_transformer = Pipeline(steps=[\n",
    "        (\"imputer\", SimpleImputer(strategy=\"median\")),\n",
    "        (\"scaler\", RobustScaler()),\n",
    "        (\"pca\",PCA(0.95))\n",
    "    ])\n",
    "\n",
    "    categorical_features = [\"flag\"]\n",
    "    for i in categorical_features:\n",
    "        partition[i] = partition[i].astype(\"category\")\n",
    "\n",
    "    categorical_transformer = Pipeline(steps=[\n",
    "        (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=0)),\n",
    "        (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\"))])\n",
    "\n",
    "    preprocessor = ColumnTransformer(transformers=[\n",
    "            (\"num\", numeric_transformer, numeric_features),\n",
    "            (\"cat\", categorical_transformer, categorical_features)])\n",
    "\n",
    "    features = numeric_features + categorical_features\n",
    "    pipeline = Pipeline([(\"preprocessor\", preprocessor),\n",
    "                         (\"rf\", RandomForestRegressor(max_depth=hyperparams[\"max_depth\"]))])\n",
    "    pipeline.fit(partition[features], partition[['Y1']])\n",
    "    pipeline.features = features\n",
    "\n",
    "    partition_id = partition.partition_ID.iloc[0]\n",
    "    artefact = base64.b64encode(dill.dumps(pipeline))\n",
    "\n",
    "    # record whatever partition level information you want like rows, data stats, explainability, etc\n",
    "    partition_metadata = json.dumps({\n",
    "        \"num_rows\": partition.shape[0],\n",
    "        \"hyper_parameters\": hyperparams\n",
    "    })\n",
    "\n",
    "    return np.array([[partition_id, model_version, partition.shape[0], partition_metadata, artefact]])\n",
    "\n",
    "print(\"Starting training...\")\n",
    "\n",
    "query = \"SELECT * FROM {table} WHERE fold_ID='train'\".format(table=data_conf[\"table\"])\n",
    "df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id=\"model_train\")\n",
    "model_df = df.map_partition(lambda partition: train_partition(partition, model_version, hyperparams),\n",
    "                            partition_by=\"partition_id\",\n",
    "                            returns=[[\"partition_id\", \"VARCHAR(255)\"],\n",
    "                                     [\"model_version\", \"VARCHAR(255)\"],\n",
    "                                     [\"num_rows\", \"BIGINT\"],\n",
    "                                     [\"partition_metadata\", \"CLOB\"],\n",
    "                                     [\"model_artefact\", \"CLOB\"]])\n",
    "# materialize as we reuse result\n",
    "model_df = DataFrame(model_df._table_name, materialize=True)\n",
    "\n",
    "# append to models table\n",
    "model_df.to_sql(\"aoa_sto_models\", if_exists=\"append\")\n",
    "\n",
    "print(\"Finished training\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "  partition_id                         model_version  num_rows                                 partition_metadata                                     model_artefact\n",
       "0           12  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {\"num_rows\": 7999, \"hyper_parameters\": {\"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...\n",
       "1            2  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {\"num_rows\": 7999, \"hyper_parameters\": {\"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...\n",
       "2           14  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {\"num_rows\": 7999, \"hyper_parameters\": {\"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...\n",
       "3           10  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {\"num_rows\": 7999, \"hyper_parameters\": {\"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ...\n",
       "4            1  7bc93a75-fe11-4b17-81a4-c87ffbb85295      7999  {\"num_rows\": 7999, \"hyper_parameters\": {\"max_d...  gANjc2tsZWFybi5waXBlbGluZQpQaXBlbGluZQpxACmBcQ..."
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "model_df.head(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Evaluating the MicroModels\n",
    "\n",
    "After training, we need to evaluate the models trained on each partition. We will need to do this multiple times, on different datasets if we want to track the performance of the model against existing and new labelled data. \n",
    "\n",
    "The following example also uses the tdextensions `map_partition` to achieve this. It calculates and returns metrics per partition which we record against the model version. We also support calculating the average metric value across all partitions (the aoa does this automatically) based on metrics which are aggregatable. \n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Finished evaluation\n"
     ]
    }
   ],
   "source": [
    "from sklearn import metrics\n",
    "from aoa.sto.util import save_metadata, save_evaluation_metrics\n",
    "\n",
    "data_conf = {\n",
    "    \"table\": \"STO_SYNTHETIC_TEST_V\"\n",
    "}\n",
    "\n",
    "\n",
    "def eval_partition(partition):\n",
    "    model_artefact = partition.loc[partition['n_row'] == 1, 'model_artefact'].iloc[0]\n",
    "    model = dill.loads(base64.b64decode(model_artefact))\n",
    "\n",
    "    X_test = partition[model.features]\n",
    "    y_test = partition[['Y1']]\n",
    "\n",
    "    y_pred = model.predict(X_test)\n",
    "\n",
    "    partition_id = partition.partition_ID.iloc[0]\n",
    "\n",
    "    # record whatever partition level information you want like rows, data stats, metrics, explainability, etc\n",
    "    partition_metadata = json.dumps({\n",
    "        \"num_rows\": partition.shape[0],\n",
    "        \"metrics\": {\n",
    "            \"MAE\": \"{:.2f}\".format(metrics.mean_absolute_error(y_test, y_pred)),\n",
    "            \"MSE\": \"{:.2f}\".format(metrics.mean_squared_error(y_test, y_pred)),\n",
    "            \"R2\": \"{:.2f}\".format(metrics.r2_score(y_test, y_pred))\n",
    "        }\n",
    "    })\n",
    "\n",
    "    return np.array([[partition_id, partition.shape[0], partition_metadata]])\n",
    "\n",
    "# we join the model artefact to the 1st row of the data table so we can load it in the partition\n",
    "query = f\"\"\"\n",
    "SELECT d.*, CASE WHEN n_row=1 THEN m.model_artefact ELSE null END AS model_artefact \n",
    "    FROM (SELECT x.*, ROW_NUMBER() OVER (PARTITION BY x.partition_id ORDER BY x.partition_id) AS n_row FROM {data_conf[\"table\"]} x) AS d\n",
    "    LEFT JOIN aoa_sto_models m\n",
    "    ON d.partition_id = m.partition_id\n",
    "    WHERE m.model_version = '{model_version}'\n",
    "\"\"\"\n",
    "\n",
    "df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id=\"model_eval\")\n",
    "eval_df = df.map_partition(lambda partition: eval_partition(partition),\n",
    "                           partition_by=\"partition_id\",\n",
    "                           returns=[[\"partition_id\", \"VARCHAR(255)\"],\n",
    "                                    [\"num_rows\", \"BIGINT\"],\n",
    "                                    [\"partition_metadata\", \"CLOB\"]])\n",
    "\n",
    "# materialize as we reuse result\n",
    "eval_df = DataFrame(eval_df._table_name, materialize=True)\n",
    "\n",
    "#save_metadata(eval_df)\n",
    "#save_evaluation_metrics(eval_df, [\"MAE\", \"MSE\", \"R2\"])\n",
    "\n",
    "print(\"Finished evaluation\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "  partition_id  num_rows                                 partition_metadata\n",
       "0           12      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"1.91\", ...\n",
       "1            2      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.75\", ...\n",
       "2           29      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.99\", ...\n",
       "3            4      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.83\", ...\n",
       "4            7      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"1.43\", ...\n",
       "5            9      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.68\", ...\n",
       "6            6      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"1.74\", ...\n",
       "7           14      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.71\", ...\n",
       "8           10      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.61\", ...\n",
       "9            1      2001  {\"num_rows\": 2001, \"metrics\": {\"MAE\": \"0.83\", ..."
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "eval_df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Scoring \n",
    "\n",
    "You have already seen how to perform scoring based on the evaluation code. However, we can also separate the scoring logic into its own simple function which can be called independently by a scheduler or a user on-demand. \n",
    "\n",
    "When this code is executed it stores the model code in TD and this allows it to be called later via pure SQL for example instead of via the python code shown here. This provides extra flexiblity.\n",
    "\n",
    "Note that the AOA can automatically schedule the scoring logic show below to exeucte at some defined interval. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "data_conf = {\n",
    "    \"table\": \"STO_SYNTHETIC_TEST_V\",\n",
    "    \"predictions\": \"STO_SYNTHETIC_PREDICTIONS\"\n",
    "}\n",
    "\n",
    "def score_partition(partition):\n",
    "    model_artefact = partition.loc[partition['n_row'] == 1, 'model_artefact'].iloc[0]\n",
    "    model = dill.loads(base64.b64decode(model_artefact))\n",
    "\n",
    "    X = partition[model.features]\n",
    "\n",
    "    return model.predict(X)\n",
    "\n",
    "# we join the model artefact to the 1st row of the data table so we can load it in the partition\n",
    "query = f\"\"\"\n",
    "SELECT d.*, CASE WHEN n_row=1 THEN m.model_artefact ELSE null END AS model_artefact \n",
    "    FROM (SELECT x.*, ROW_NUMBER() OVER (PARTITION BY x.partition_id ORDER BY x.partition_id) AS n_row FROM {data_conf[\"table\"]} x) AS d\n",
    "    LEFT JOIN aoa_sto_models m\n",
    "    ON d.partition_id = m.partition_id\n",
    "    WHERE m.model_version = '{model_version}'\n",
    "\"\"\"\n",
    "\n",
    "df = DistDataFrame(query=query, dist_mode=DistMode.STO, sto_id=\"my_model_score\")\n",
    "scored_df = df.map_partition(lambda partition: score_partition(partition),\n",
    "                             partition_by=\"partition_id\",\n",
    "                             returns=[[\"prediction\", \"VARCHAR(255)\"]])\n",
    "\n",
    "scored_df.to_sql(data_conf[\"predictions\"], if_exists=\"append\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
